Skip to content

bigquery: add schema resolver, evolution, and table name sanitization#4385

Open
squiidz wants to merge 2 commits intomainfrom
bq-schema-management
Open

bigquery: add schema resolver, evolution, and table name sanitization#4385
squiidz wants to merge 2 commits intomainfrom
bq-schema-management

Conversation

@squiidz
Copy link
Copy Markdown
Contributor

@squiidz squiidz commented May 1, 2026

Schema resolution + evolution

  • Add schemaResolver that fetches table metadata, builds a proto descriptor via the adapt pipeline, caches per table, and deduplicates concurrent cold-cache loads via singleflight.
  • Add schemaEvolver that diffs proto descriptor vs table schema and adds missing columns. Uses ETag with CAS-on-412 retry (max 5 attempts) so concurrent additive evolutions don't clobber each other; on retry exhaustion or a benign "another writer added the columns" outcome, signal the caller to retry the write.
  • Add grpcSchemaMismatch classification via StorageError details. In handleWriteError: on schema mismatch, evolve and retry; on evolution failure, return a permanent BatchError so messages go to DLQ instead of being retried forever.
  • Add bigquery_write_api_schema_evolutions_total and bigquery_write_api_schema_evolution_failures_total counters.

Table name handling

  • Sanitize interpolated table names (dots, hyphens, slashes, and whitespace become underscores; non-ASCII-alphanumerics are stripped; leading digits prefixed with _; cap at 1024 chars).
  • Reject empty-after-sanitization names with a permanent BatchError.

Error classification

  • Add isPermanentBQError that recognises both gRPC permanent codes and *googleapi.Error 4xx (excluding 408/429), so REST 4xx errors from the resolver path no longer retry forever.

Lifecycle and concurrency

  • Drop bqClient/datasetID fields from resolver and evolver; pass client + dataset as method arguments so they can't outlive the client. Close drains the resolver cache.
  • Track previously fire-and-forget stream closes (evictStream and the idle sweeper) on a closeWg so Close waits for them before tearing down the underlying clients.
  • Snapshot client and resolvedProjectID together under one connMu RLock in WriteBatch; pass projectID through to tableCacheKey.
  • Run client closes unconditionally in Close — the bigquery and managedwriter clients are non-blocking gRPC teardowns; gating them on ctx.Err() leaks connections.

Config validation and tuning

  • Reject delegates without target_principal at parse time.
  • Lower max_in_flight default from 64 to 4 to match snowflake_streaming and iceberg sibling outputs.
  • Detach the impersonate token source from Connect's ctx with context.WithoutCancel so a cancelled connect doesn't break later token refreshes.
  • Warn explicitly that endpoint overrides disable authentication.

Cleanup

  • Drop dead fieldNameMapping plumbing (resolvedSchema field, jsonToProtoBytes parameter, streamWithDescriptor field) — never wired in production.
  • Drop unused *service.Message argument from Resolve.
  • Reject non-positive proto Kind in protoKindToBQFieldType instead of silently coercing to STRING; cap RECORD nesting at 15 (BigQuery's own limit) to fail fast on self-referential descriptors.
  • Set Repeated on field schemas for repeated proto fields.

Tests

  • Unit tests for resolver (cache, evict, no-client fallback), evolver helpers (kind mapping, schema diff, repeated fields), buildAuthOpts (endpoint override, credentials_json, no-auth), error classification (gRPC permanent, REST 4xx, wrapped), table-name sanitization, sweep goroutine, config validation (durations, delegates).
  • Integration tests for schema evolution and table-name sanitization against the goccy bigquery emulator.

Schema resolution + evolution
- Add schemaResolver that fetches table metadata, builds a proto
  descriptor via the adapt pipeline, caches per table, and
  deduplicates concurrent cold-cache loads via singleflight.
- Add schemaEvolver that diffs proto descriptor vs table schema and
  adds missing columns. Uses ETag with CAS-on-412 retry (max 5
  attempts) so concurrent additive evolutions don't clobber each
  other; on retry exhaustion or a benign "another writer added the
  columns" outcome, signal the caller to retry the write.
- Add grpcSchemaMismatch classification via StorageError details.
  In handleWriteError: on schema mismatch, evolve and retry; on
  evolution failure, return a permanent BatchError so messages go
  to DLQ instead of being retried forever.
- Add bigquery_write_api_schema_evolutions_total and
  bigquery_write_api_schema_evolution_failures_total counters.

Table name handling
- Sanitize interpolated table names (dots, hyphens, slashes, and
  whitespace become underscores; non-ASCII-alphanumerics are
  stripped; leading digits prefixed with _; cap at 1024 chars).
- Reject empty-after-sanitization names with a permanent BatchError.

Error classification
- Add isPermanentBQError that recognises both gRPC permanent codes
  and *googleapi.Error 4xx (excluding 408/429), so REST 4xx errors
  from the resolver path no longer retry forever.

Lifecycle and concurrency
- Drop bqClient/datasetID fields from resolver and evolver; pass
  client + dataset as method arguments so they can't outlive the
  client. Close drains the resolver cache.
- Track previously fire-and-forget stream closes (evictStream and
  the idle sweeper) on a closeWg so Close waits for them before
  tearing down the underlying clients.
- Snapshot client and resolvedProjectID together under one connMu
  RLock in WriteBatch; pass projectID through to tableCacheKey.
- Run client closes unconditionally in Close — the bigquery and
  managedwriter clients are non-blocking gRPC teardowns; gating
  them on ctx.Err() leaks connections.

Config validation and tuning
- Reject delegates without target_principal at parse time.
- Lower max_in_flight default from 64 to 4 to match snowflake_streaming
  and iceberg sibling outputs.
- Detach the impersonate token source from Connect's ctx with
  context.WithoutCancel so a cancelled connect doesn't break later
  token refreshes.
- Warn explicitly that endpoint overrides disable authentication.

Cleanup
- Drop dead fieldNameMapping plumbing (resolvedSchema field, jsonToProtoBytes
  parameter, streamWithDescriptor field) — never wired in production.
- Drop unused *service.Message argument from Resolve.
- Reject non-positive proto Kind in protoKindToBQFieldType instead of
  silently coercing to STRING; cap RECORD nesting at 15 (BigQuery's
  own limit) to fail fast on self-referential descriptors.
- Set Repeated on field schemas for repeated proto fields.

Tests
- Unit tests for resolver (cache, evict, no-client fallback),
  evolver helpers (kind mapping, schema diff, repeated fields),
  buildAuthOpts (endpoint override, credentials_json, no-auth),
  error classification (gRPC permanent, REST 4xx, wrapped),
  table-name sanitization, sweep goroutine, config validation
  (durations, delegates).
- Integration tests for schema evolution and table-name
  sanitization against the goccy bigquery emulator.
@claude
Copy link
Copy Markdown

claude Bot commented May 1, 2026

Commits

Single commit bigquery: add schema resolver, evolution, and table name sanitization follows the system: message format and is descriptive. The change set is large (~1100 LOC across 10 files) and bundles several independent concerns — schema resolution + evolution + table-name sanitization clearly belong together, but the max_in_flight default change (64 → 4), the lifecycle/concurrency cleanup (closeWg, context.WithoutCancel on the impersonate token source), and the dead-field/parameter cleanup (fieldNameMapping, streamWithDescriptor.fieldNameMapping, Resolve's unused *service.Message arg) are arguably unrelated to the headline feature and could have been split per CLAUDE.md commit granularity guidance. Worth noting, not blocking.

Review

Schema resolver/evolver design is well thought out (singleflight on cold-cache loads, ETag-based CAS retry on 412, separate gRPC SCHEMA_MISMATCH_EXTRA_FIELDS classification, permanent-vs-transient split for REST 4xx via googleapi.Error). Lifecycle fixes around closeWg, snapshotting client+projectID together, and unconditional client teardown all read correctly. Tests cover the new behavior thoroughly: table-driven success+error cases for sanitization, kind mapping, schema diff, repeated fields, error classification, and config validation; integration tests use the goccy emulator with the proper service.NewStreamBuilder + license.InjectTestService pattern.

LGTM

createStream caches the ManagedStream for reuse across every
batch routing to the same table. Bind it to the per-batch ctx
and the first batch's cancellation (per-message deadline, source
shutdown, ack timeout) will sever the cached stream, blocking
every later AppendRows against it until the idle sweeper evicts
the entry.

Wrap with context.WithoutCancel so cancellation propagates only
to the in-flight NewManagedStream dial, not to the long-lived
stream itself.
@claude
Copy link
Copy Markdown

claude Bot commented May 4, 2026

Commits
LGTM

Review
Reviewed the bigquery output changes (schema resolver, evolver, table-name sanitization, lifecycle/concurrency tightening, REST 4xx classification, and the cached-stream ctx detach). Patterns match the project: RCL headers, field-name constants with the new bqwaep nested prefix, batch-error construction, mutex/ctx propagation, and tests cover the new helpers and integration paths against the emulator.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant